Spark 3.0

Introduction to Hadoop YARN

Language Support

Spark 3.0 will move to Python3 and Scala version is upgraded to version 2.12. In addition it will fully support JDK 11. Python 2.x is heavily deprecated .
Adaptive execution of Spark SQL
Before AQE, Spark SQL’s static optimization was good, but not perfect. Static optimization uses statistics collected during data ingestion time or via ANALYZE TABLE commands. However, it can fail to optimize the query plan accurately, particularly when stats are not available or stale. To overcome these limitations, AQE was introduced as an experimental feature in Apache Spark 3.0. AQE is a framework that improves the performance of Spark SQL jobs by dynamically adjusting the query execution plan based on the runtime statistics of the intermediate data. Essentially, AQE reoptimizes the query plan as it gets a clearer picture of the data.
In Spark 3.0, the AQE framework is shipped with three features:
  • Dynamically coalescing shuffle partitions
  • Dynamically switching join strategies
  • Dynamically optimizing skew joins
The following sections will talk about these three features in detail.

Dynamically coalescing shuffle partitions
When running queries in Spark to deal with very large data, shuffle usually has a very important impact on query performance among many other things. Shuffle is an expensive operator as it needs to move data across the network, so that data is redistributed in a way required by downstream operators. One key property of shuffle is the number of partitions. The best number of partitions is data dependent, yet data sizes may differ vastly from stage to stage, query to query, making this number hard to tune:
  • If there are too few partitions, then the data size of each partition may be very large, and the tasks to process these large partitions may need to spill data to disk (e.g., when sort or aggregate is involved) and, as a result, slow down the query.
  • If there are too many partitions, then the data size of each partition may be very small, and there will be a lot of small network data fetches to read the shuffle blocks, which can also slow down the query because of the inefficient I/O pattern. Having a large number of tasks also puts more burden on the Spark task scheduler.
To solve this problem, we can set a relatively large number of shuffle partitions at the beginning, then combine adjacent small partitions into bigger partitions at runtime by looking at the shuffle file statistics.

For example, let's say we are running the query SELECT max(i)FROM tbl GROUP BY j. The input data tbl is rather small so there are only two partitions before grouping. The initial shuffle partition number is set to five, so after local grouping, the partially grouped data is shuffled into five partitions. Without AQE, Spark will start five tasks to do the final aggregation. However, there are three very small partitions here, and it would be a waste to start a separate task for each of them.



Instead, AQE coalesces these three small partitions into one and, as a result, the final aggregation now only needs to perform three tasks rather than five.

Dynamically switching join strategies
Spark supports a number of join strategies, among which broadcast hash join is usually the most performant if one side of the join can fit well in memory. And for this reason, Spark plans a broadcast hash join if the estimated size of a join relation is lower than the broadcast-size threshold. But a number of things can make this size estimation go wrong — such as the presence of a very selective filter — or the join relation being a series of complex operators other than just a scan.

To solve this problem, AQE now replans the join strategy at runtime based on the most accurate join relation size. As can be seen in the following example, the right side of the join is found to be way smaller than the estimate and also small enough to be broadcast, so after the AQE reoptimization the statically planned sort merge join is now converted to a broadcast hash join.






For the broadcast hash join converted at runtime, we may further optimize the regular shuffle to a localized shuffle (i.e., shuffle that reads on a per mapper basis instead of a per reducer basis) to reduce the network traffic.
Dynamically optimizing skew joins
Data skew occurs when data is unevenly distributed among partitions in the cluster. Severe skew can significantly downgrade query performance, especially with joins. AQE skew join optimization detects such skew automatically from shuffle file statistics. It then splits the skewed partitions into smaller subpartitions, which will be joined to the corresponding partition from the other side respectively.

Let's take this example of table A join table B, in which table A has a partition A0 significantly bigger than its other partitions.

Dynamic Partition Pruning 
Spark 3.0 introduces Dynamic Partition Pruning which is a major performance improvement for SQL analytics workloads that in term can make integration with BI tools much better. The idea behind DPP is to apply the filter set on the dimension table — mostly small and used in a broadcast hash join — directly on the fact table so it could skip scanning unneeded partitions.

Binary files data source

Spark 3.0 supports binary file data source. You can use it like this:

val df = spark.read.format(“binaryFile”)

The above will read binary files and converts each one to a single row that contains the raw content and metadata of the file. The DataFrame will contain the following columns and possibly partition columns:

  • path: StringType
  • modificationTime: TimestampType
  • length: LongType
  • content: BinaryType

writing back a binary DataFrame/RDD is currently not supported.

No comments:

Post a Comment